-
Notifications
You must be signed in to change notification settings - Fork 15.9k
Resolve OOM When Reading Large Logs in Webserver #49470
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Resolve OOM When Reading Large Logs in Webserver #49470
Conversation
d47a49b to
2f5a202
Compare
4e1bcb4 to
358f231
Compare
358f231 to
9900b2e
Compare
Backport failed to create: v3-0-test. View the failure log Run details
You can attempt to backport this manually by running: cherry_picker ee54fe9 v3-0-testThis should apply the commit to the v3-0-test branch and leave the commit in conflict state marking After you have resolved the conflicts, you can continue the backport process by running: cherry_picker --continue |
|
I marked it for backport - but (as expected) it failed automatic cherry-picking - but you might want to attempt to fix the conflict and see if it can be merged to v3-0-tests ^^ instructions above. |
|
Congrats 🎉 |
|
How am I able to get a container image based the new main branch including this fix so that I can test this in a Kubernetes cluster? We already have a Dockerfile which uses a released Airflow container image a base |
|
Thanks everyone for your help reviewing this PR along the way!
Will do! I’ll continue working on resolving the conflicts and push the fix to the
The fastest way in my mind is:
|
|
#protm |
* Add note for new usage of LogMetadata
* Add _stream_parsed_lines_by_chunk
* Refactor _read_from_local/logs_server as return stream
* Refactor _interleave_logs with K-Way Merge
* Add _get_compatible_log_stream
* Refactor _read method to return stream with compatible interface
- Add compatible interface for executor, remote_logs
- Refactor skip log_pos with skip for each log source
* Refactor log_reader to adapt stream
* Fix _read_from_local open closed file error
* Refactor LogReader by yielding in batch
* Add ndjson header to get_log openapi schema
* Fix _add_log_from_parsed_log_streams_to_heap
- Add comparator for StructuredLogMessage
- Refactor parsed_log_streams from list to dict for removing empty logs
* Fix _interleave_logs dedupe logic
- should check the current logs with default timestamp
* Refactor test_log_handlers
- Fix events utils
- Add convert_list_to_stream, mock_parsed_logs_factory utils
- Fix the following test after refactoring FileTaskHandler
- test_file_task_handler_when_ti_value_is_invalid
- test_file_task_handler
- test_file_task_handler_running
- test_file_task_handler_rotate_size_limit
- test__read_when_local
- test__read_served_logs_checked_when_done_and_no_local_or_remote_logs
- test_interleave_interleaves
- test_interleave_logs_correct_ordering
- test_interleave_logs_correct_dedupe
- Add new test for refactoring FileTaskHandler
- test__stream_lines_by_chunk
- test__log_stream_to_parsed_log_stream
- test__sort_key
- test__is_sort_key_with_default_timestamp
- test__is_logs_stream_like
- test__add_log_from_parsed_log_streams_to_heap
* Move test_log_handlers utils to test_common
* Fix unit/celery/log_handlers test
* Fix mypy-providers static check
* Fix _get_compatible_log_stream
- sequential yield instead of parallel yield for all log_stream
* Fix amazon task_handler test
* Fix wask task handler test
* Fix elasticsearch task handler test
* Fix opensearch task handler test
* Fix TaskLogReader buffer
- don't concat buffer with empty str, yield directly from buffer
* Fix test_log_reader
* Fix CloudWatchRemoteLogIO.read mypy
* Fix test_gcs_task_handler
* Fix core_api test_log
* Fix CloudWatchRemoteLogIO._event_to_str dt format
* Fix TestCloudRemoteLogIO.test_log_message
* Fix es/os task_hander convert_list_to_stream
* Fix compact tests
* Refactor es,os task handler for 3.0 compact
* Fix compat for RedisTaskHandler
* Fix ruff format for test_cloudwatch_task_handler after rebase
* Fix 2.10 compat TestCloudwatchTaskHandler
* Fix 3.0 compat test for celery, wasb
Fix wasb test, spelling
* Fix 3.0 compat test for gcs
* Fix 3.0 compat test for cloudwatch, s3
* Set get_log API default response format to JSON
* Remove "first_time_read" key in log metadata
* Remove "<source>_log_pos" key in log metadata
* Add LogStreamCounter for backward compatibility
* Remove "first_time_read" with backward "log_pos" for tests
- test_log_reader
- test_log_handlers
- test_cloudwatch_task_handler
- test_s3_task_handler
- celery test_log_handler
- test_gcs_task_handler
- test_wasb_task_handler
- fix redis_task_handler
- fix log_pos
* Fix RedisTaskHandler compatibility
* Fix chores in self review
- Fix typo in _read_from_logs_server
- Remove unused parameters in _stream_lines_by_chunk
- read_log_stream
- Fix doc string by removing outdate note
- Only add buffer for full_download
- Add test ndjson format for get_log API
* Fine-tune HEAP_DUMP_SIZE
* Replace get_compatible_output_log_stream with iter
* Remove buffer in log_reader
* Fix log_id not found compact for es_task_handler
* Fix review comments
- rename LogStreamCounter as LogStreamAccumulator
- simply for-yield with yield-from in log_reader
- add type annotation for LogStreamAccumulator
* Refactor LogStreamAccumulator._capture method
- use itertools.isslice to get chunk
* Fix type hint, joinedload for ti.dag_run after merge
* Replace _sort_key as _create_sort_key
* Add _flush_logs_out_of_heap common util
* Fix review nits
- _is_logs_stream_like
- add type annotation
- reduce to 1 isinstance call
- construct log_streams in _get_compatible_log_stream inline
- use TypeDict for LogMetadata
- remove len(logs) to check empty
- revert typo of self.log_handler.read in log_reader
- log_stream_accumulator
- refactor flush logic
- make totoal_lines as property
- make stream as property
* Fix mypy errors after merge
* Fix redis task handler test
* Refactor _capture logic in LogStreamAccumulator
* Add comments for ingore LogMetadata TypeDict
* Add comment for offset; Fix commet for LogMessages
* Refactor with from_iterable, islice
* Fix nits in test
- refactor structured_logs fixtures in TestLogStreamAccumulator
- use f-strign in test_file_task_handler
- assert actual value of _create_sort_key
- add details comments in test__add_log_from_parsed_log_streams_to_heap
* Refactor test_utils
* Add comment for lazy initialization
* Fix error handling for _stream_lines_by_chunk
* Fix mypy error after merge
* Fix final review nits
* Fix mypy error
|
Wowo what a long story to get this merged! Impressive! Looking forward to test this. As it is a major change I'd recommend NOT backporting to 3.0. We had a couple of rc's and still ongoing voting. I think it is good to have this in 3.1 as a feature increment. It is not really a bug fix helping for stability and I see rather the risk breaking in a minor patch. |
* Add note for new usage of LogMetadata
* Add _stream_parsed_lines_by_chunk
* Refactor _read_from_local/logs_server as return stream
* Refactor _interleave_logs with K-Way Merge
* Add _get_compatible_log_stream
* Refactor _read method to return stream with compatible interface
- Add compatible interface for executor, remote_logs
- Refactor skip log_pos with skip for each log source
* Refactor log_reader to adapt stream
* Fix _read_from_local open closed file error
* Refactor LogReader by yielding in batch
* Add ndjson header to get_log openapi schema
* Fix _add_log_from_parsed_log_streams_to_heap
- Add comparator for StructuredLogMessage
- Refactor parsed_log_streams from list to dict for removing empty logs
* Fix _interleave_logs dedupe logic
- should check the current logs with default timestamp
* Refactor test_log_handlers
- Fix events utils
- Add convert_list_to_stream, mock_parsed_logs_factory utils
- Fix the following test after refactoring FileTaskHandler
- test_file_task_handler_when_ti_value_is_invalid
- test_file_task_handler
- test_file_task_handler_running
- test_file_task_handler_rotate_size_limit
- test__read_when_local
- test__read_served_logs_checked_when_done_and_no_local_or_remote_logs
- test_interleave_interleaves
- test_interleave_logs_correct_ordering
- test_interleave_logs_correct_dedupe
- Add new test for refactoring FileTaskHandler
- test__stream_lines_by_chunk
- test__log_stream_to_parsed_log_stream
- test__sort_key
- test__is_sort_key_with_default_timestamp
- test__is_logs_stream_like
- test__add_log_from_parsed_log_streams_to_heap
* Move test_log_handlers utils to test_common
* Fix unit/celery/log_handlers test
* Fix mypy-providers static check
* Fix _get_compatible_log_stream
- sequential yield instead of parallel yield for all log_stream
* Fix amazon task_handler test
* Fix wask task handler test
* Fix elasticsearch task handler test
* Fix opensearch task handler test
* Fix TaskLogReader buffer
- don't concat buffer with empty str, yield directly from buffer
* Fix test_log_reader
* Fix CloudWatchRemoteLogIO.read mypy
* Fix test_gcs_task_handler
* Fix core_api test_log
* Fix CloudWatchRemoteLogIO._event_to_str dt format
* Fix TestCloudRemoteLogIO.test_log_message
* Fix es/os task_hander convert_list_to_stream
* Fix compact tests
* Refactor es,os task handler for 3.0 compact
* Fix compat for RedisTaskHandler
* Fix ruff format for test_cloudwatch_task_handler after rebase
* Fix 2.10 compat TestCloudwatchTaskHandler
* Fix 3.0 compat test for celery, wasb
Fix wasb test, spelling
* Fix 3.0 compat test for gcs
* Fix 3.0 compat test for cloudwatch, s3
* Set get_log API default response format to JSON
* Remove "first_time_read" key in log metadata
* Remove "<source>_log_pos" key in log metadata
* Add LogStreamCounter for backward compatibility
* Remove "first_time_read" with backward "log_pos" for tests
- test_log_reader
- test_log_handlers
- test_cloudwatch_task_handler
- test_s3_task_handler
- celery test_log_handler
- test_gcs_task_handler
- test_wasb_task_handler
- fix redis_task_handler
- fix log_pos
* Fix RedisTaskHandler compatibility
* Fix chores in self review
- Fix typo in _read_from_logs_server
- Remove unused parameters in _stream_lines_by_chunk
- read_log_stream
- Fix doc string by removing outdate note
- Only add buffer for full_download
- Add test ndjson format for get_log API
* Fine-tune HEAP_DUMP_SIZE
* Replace get_compatible_output_log_stream with iter
* Remove buffer in log_reader
* Fix log_id not found compact for es_task_handler
* Fix review comments
- rename LogStreamCounter as LogStreamAccumulator
- simply for-yield with yield-from in log_reader
- add type annotation for LogStreamAccumulator
* Refactor LogStreamAccumulator._capture method
- use itertools.isslice to get chunk
* Fix type hint, joinedload for ti.dag_run after merge
* Replace _sort_key as _create_sort_key
* Add _flush_logs_out_of_heap common util
* Fix review nits
- _is_logs_stream_like
- add type annotation
- reduce to 1 isinstance call
- construct log_streams in _get_compatible_log_stream inline
- use TypeDict for LogMetadata
- remove len(logs) to check empty
- revert typo of self.log_handler.read in log_reader
- log_stream_accumulator
- refactor flush logic
- make totoal_lines as property
- make stream as property
* Fix mypy errors after merge
* Fix redis task handler test
* Refactor _capture logic in LogStreamAccumulator
* Add comments for ingore LogMetadata TypeDict
* Add comment for offset; Fix commet for LogMessages
* Refactor with from_iterable, islice
* Fix nits in test
- refactor structured_logs fixtures in TestLogStreamAccumulator
- use f-strign in test_file_task_handler
- assert actual value of _create_sort_key
- add details comments in test__add_log_from_parsed_log_streams_to_heap
* Refactor test_utils
* Add comment for lazy initialization
* Fix error handling for _stream_lines_by_chunk
* Fix mypy error after merge
* Fix final review nits
* Fix mypy error
(cherry picked from commit ee54fe9)
Actually maybe - since 3.0.3 is about to be released - if we are able to test it in the next few weeks - I would be for trying to backport it for 3.0.4. There are a number of users who experienced this issue and we could even make an interesting approach - we could release 3.0.4 b1 with only that change applied and ask the users who had this problem to run it (on their own risk of course). We have never done that before other than for X.0.0 - but there is nothing stopoing us from doing it - and then we could give it a few weeks to test it. This way we might be able to give it in the hands of users in 3.0.4 - possibly a month before 3.1.0 (and also 3.1.0 will likely have other changes that might cause other issues). I think this is one of those cases where - while massive - this is essentially a bug-fix, not a new feature and it has quite an impact. But - it is not a must of course. Just one of possibilities I see. |
* Add note for new usage of LogMetadata
* Add _stream_parsed_lines_by_chunk
* Refactor _read_from_local/logs_server as return stream
* Refactor _interleave_logs with K-Way Merge
* Add _get_compatible_log_stream
* Refactor _read method to return stream with compatible interface
- Add compatible interface for executor, remote_logs
- Refactor skip log_pos with skip for each log source
* Refactor log_reader to adapt stream
* Fix _read_from_local open closed file error
* Refactor LogReader by yielding in batch
* Add ndjson header to get_log openapi schema
* Fix _add_log_from_parsed_log_streams_to_heap
- Add comparator for StructuredLogMessage
- Refactor parsed_log_streams from list to dict for removing empty logs
* Fix _interleave_logs dedupe logic
- should check the current logs with default timestamp
* Refactor test_log_handlers
- Fix events utils
- Add convert_list_to_stream, mock_parsed_logs_factory utils
- Fix the following test after refactoring FileTaskHandler
- test_file_task_handler_when_ti_value_is_invalid
- test_file_task_handler
- test_file_task_handler_running
- test_file_task_handler_rotate_size_limit
- test__read_when_local
- test__read_served_logs_checked_when_done_and_no_local_or_remote_logs
- test_interleave_interleaves
- test_interleave_logs_correct_ordering
- test_interleave_logs_correct_dedupe
- Add new test for refactoring FileTaskHandler
- test__stream_lines_by_chunk
- test__log_stream_to_parsed_log_stream
- test__sort_key
- test__is_sort_key_with_default_timestamp
- test__is_logs_stream_like
- test__add_log_from_parsed_log_streams_to_heap
* Move test_log_handlers utils to test_common
* Fix unit/celery/log_handlers test
* Fix mypy-providers static check
* Fix _get_compatible_log_stream
- sequential yield instead of parallel yield for all log_stream
* Fix amazon task_handler test
* Fix wask task handler test
* Fix elasticsearch task handler test
* Fix opensearch task handler test
* Fix TaskLogReader buffer
- don't concat buffer with empty str, yield directly from buffer
* Fix test_log_reader
* Fix CloudWatchRemoteLogIO.read mypy
* Fix test_gcs_task_handler
* Fix core_api test_log
* Fix CloudWatchRemoteLogIO._event_to_str dt format
* Fix TestCloudRemoteLogIO.test_log_message
* Fix es/os task_hander convert_list_to_stream
* Fix compact tests
* Refactor es,os task handler for 3.0 compact
* Fix compat for RedisTaskHandler
* Fix ruff format for test_cloudwatch_task_handler after rebase
* Fix 2.10 compat TestCloudwatchTaskHandler
* Fix 3.0 compat test for celery, wasb
Fix wasb test, spelling
* Fix 3.0 compat test for gcs
* Fix 3.0 compat test for cloudwatch, s3
* Set get_log API default response format to JSON
* Remove "first_time_read" key in log metadata
* Remove "<source>_log_pos" key in log metadata
* Add LogStreamCounter for backward compatibility
* Remove "first_time_read" with backward "log_pos" for tests
- test_log_reader
- test_log_handlers
- test_cloudwatch_task_handler
- test_s3_task_handler
- celery test_log_handler
- test_gcs_task_handler
- test_wasb_task_handler
- fix redis_task_handler
- fix log_pos
* Fix RedisTaskHandler compatibility
* Fix chores in self review
- Fix typo in _read_from_logs_server
- Remove unused parameters in _stream_lines_by_chunk
- read_log_stream
- Fix doc string by removing outdate note
- Only add buffer for full_download
- Add test ndjson format for get_log API
* Fine-tune HEAP_DUMP_SIZE
* Replace get_compatible_output_log_stream with iter
* Remove buffer in log_reader
* Fix log_id not found compact for es_task_handler
* Fix review comments
- rename LogStreamCounter as LogStreamAccumulator
- simply for-yield with yield-from in log_reader
- add type annotation for LogStreamAccumulator
* Refactor LogStreamAccumulator._capture method
- use itertools.isslice to get chunk
* Fix type hint, joinedload for ti.dag_run after merge
* Replace _sort_key as _create_sort_key
* Add _flush_logs_out_of_heap common util
* Fix review nits
- _is_logs_stream_like
- add type annotation
- reduce to 1 isinstance call
- construct log_streams in _get_compatible_log_stream inline
- use TypeDict for LogMetadata
- remove len(logs) to check empty
- revert typo of self.log_handler.read in log_reader
- log_stream_accumulator
- refactor flush logic
- make totoal_lines as property
- make stream as property
* Fix mypy errors after merge
* Fix redis task handler test
* Refactor _capture logic in LogStreamAccumulator
* Add comments for ingore LogMetadata TypeDict
* Add comment for offset; Fix commet for LogMessages
* Refactor with from_iterable, islice
* Fix nits in test
- refactor structured_logs fixtures in TestLogStreamAccumulator
- use f-strign in test_file_task_handler
- assert actual value of _create_sort_key
- add details comments in test__add_log_from_parsed_log_streams_to_heap
* Refactor test_utils
* Add comment for lazy initialization
* Fix error handling for _stream_lines_by_chunk
* Fix mypy error after merge
* Fix final review nits
* Fix mypy error
(cherry picked from commit ee54fe9)
* Add note for new usage of LogMetadata
* Add _stream_parsed_lines_by_chunk
* Refactor _read_from_local/logs_server as return stream
* Refactor _interleave_logs with K-Way Merge
* Add _get_compatible_log_stream
* Refactor _read method to return stream with compatible interface
- Add compatible interface for executor, remote_logs
- Refactor skip log_pos with skip for each log source
* Refactor log_reader to adapt stream
* Fix _read_from_local open closed file error
* Refactor LogReader by yielding in batch
* Add ndjson header to get_log openapi schema
* Fix _add_log_from_parsed_log_streams_to_heap
- Add comparator for StructuredLogMessage
- Refactor parsed_log_streams from list to dict for removing empty logs
* Fix _interleave_logs dedupe logic
- should check the current logs with default timestamp
* Refactor test_log_handlers
- Fix events utils
- Add convert_list_to_stream, mock_parsed_logs_factory utils
- Fix the following test after refactoring FileTaskHandler
- test_file_task_handler_when_ti_value_is_invalid
- test_file_task_handler
- test_file_task_handler_running
- test_file_task_handler_rotate_size_limit
- test__read_when_local
- test__read_served_logs_checked_when_done_and_no_local_or_remote_logs
- test_interleave_interleaves
- test_interleave_logs_correct_ordering
- test_interleave_logs_correct_dedupe
- Add new test for refactoring FileTaskHandler
- test__stream_lines_by_chunk
- test__log_stream_to_parsed_log_stream
- test__sort_key
- test__is_sort_key_with_default_timestamp
- test__is_logs_stream_like
- test__add_log_from_parsed_log_streams_to_heap
* Move test_log_handlers utils to test_common
* Fix unit/celery/log_handlers test
* Fix mypy-providers static check
* Fix _get_compatible_log_stream
- sequential yield instead of parallel yield for all log_stream
* Fix amazon task_handler test
* Fix wask task handler test
* Fix elasticsearch task handler test
* Fix opensearch task handler test
* Fix TaskLogReader buffer
- don't concat buffer with empty str, yield directly from buffer
* Fix test_log_reader
* Fix CloudWatchRemoteLogIO.read mypy
* Fix test_gcs_task_handler
* Fix core_api test_log
* Fix CloudWatchRemoteLogIO._event_to_str dt format
* Fix TestCloudRemoteLogIO.test_log_message
* Fix es/os task_hander convert_list_to_stream
* Fix compact tests
* Refactor es,os task handler for 3.0 compact
* Fix compat for RedisTaskHandler
* Fix ruff format for test_cloudwatch_task_handler after rebase
* Fix 2.10 compat TestCloudwatchTaskHandler
* Fix 3.0 compat test for celery, wasb
Fix wasb test, spelling
* Fix 3.0 compat test for gcs
* Fix 3.0 compat test for cloudwatch, s3
* Set get_log API default response format to JSON
* Remove "first_time_read" key in log metadata
* Remove "<source>_log_pos" key in log metadata
* Add LogStreamCounter for backward compatibility
* Remove "first_time_read" with backward "log_pos" for tests
- test_log_reader
- test_log_handlers
- test_cloudwatch_task_handler
- test_s3_task_handler
- celery test_log_handler
- test_gcs_task_handler
- test_wasb_task_handler
- fix redis_task_handler
- fix log_pos
* Fix RedisTaskHandler compatibility
* Fix chores in self review
- Fix typo in _read_from_logs_server
- Remove unused parameters in _stream_lines_by_chunk
- read_log_stream
- Fix doc string by removing outdate note
- Only add buffer for full_download
- Add test ndjson format for get_log API
* Fine-tune HEAP_DUMP_SIZE
* Replace get_compatible_output_log_stream with iter
* Remove buffer in log_reader
* Fix log_id not found compact for es_task_handler
* Fix review comments
- rename LogStreamCounter as LogStreamAccumulator
- simply for-yield with yield-from in log_reader
- add type annotation for LogStreamAccumulator
* Refactor LogStreamAccumulator._capture method
- use itertools.isslice to get chunk
* Fix type hint, joinedload for ti.dag_run after merge
* Replace _sort_key as _create_sort_key
* Add _flush_logs_out_of_heap common util
* Fix review nits
- _is_logs_stream_like
- add type annotation
- reduce to 1 isinstance call
- construct log_streams in _get_compatible_log_stream inline
- use TypeDict for LogMetadata
- remove len(logs) to check empty
- revert typo of self.log_handler.read in log_reader
- log_stream_accumulator
- refactor flush logic
- make totoal_lines as property
- make stream as property
* Fix mypy errors after merge
* Fix redis task handler test
* Refactor _capture logic in LogStreamAccumulator
* Add comments for ingore LogMetadata TypeDict
* Add comment for offset; Fix commet for LogMessages
* Refactor with from_iterable, islice
* Fix nits in test
- refactor structured_logs fixtures in TestLogStreamAccumulator
- use f-strign in test_file_task_handler
- assert actual value of _create_sort_key
- add details comments in test__add_log_from_parsed_log_streams_to_heap
* Refactor test_utils
* Add comment for lazy initialization
* Fix error handling for _stream_lines_by_chunk
* Fix mypy error after merge
* Fix final review nits
* Fix mypy error
(cherry picked from commit ee54fe9)
* Add note for new usage of LogMetadata
* Add _stream_parsed_lines_by_chunk
* Refactor _read_from_local/logs_server as return stream
* Refactor _interleave_logs with K-Way Merge
* Add _get_compatible_log_stream
* Refactor _read method to return stream with compatible interface
- Add compatible interface for executor, remote_logs
- Refactor skip log_pos with skip for each log source
* Refactor log_reader to adapt stream
* Fix _read_from_local open closed file error
* Refactor LogReader by yielding in batch
* Add ndjson header to get_log openapi schema
* Fix _add_log_from_parsed_log_streams_to_heap
- Add comparator for StructuredLogMessage
- Refactor parsed_log_streams from list to dict for removing empty logs
* Fix _interleave_logs dedupe logic
- should check the current logs with default timestamp
* Refactor test_log_handlers
- Fix events utils
- Add convert_list_to_stream, mock_parsed_logs_factory utils
- Fix the following test after refactoring FileTaskHandler
- test_file_task_handler_when_ti_value_is_invalid
- test_file_task_handler
- test_file_task_handler_running
- test_file_task_handler_rotate_size_limit
- test__read_when_local
- test__read_served_logs_checked_when_done_and_no_local_or_remote_logs
- test_interleave_interleaves
- test_interleave_logs_correct_ordering
- test_interleave_logs_correct_dedupe
- Add new test for refactoring FileTaskHandler
- test__stream_lines_by_chunk
- test__log_stream_to_parsed_log_stream
- test__sort_key
- test__is_sort_key_with_default_timestamp
- test__is_logs_stream_like
- test__add_log_from_parsed_log_streams_to_heap
* Move test_log_handlers utils to test_common
* Fix unit/celery/log_handlers test
* Fix mypy-providers static check
* Fix _get_compatible_log_stream
- sequential yield instead of parallel yield for all log_stream
* Fix amazon task_handler test
* Fix wask task handler test
* Fix elasticsearch task handler test
* Fix opensearch task handler test
* Fix TaskLogReader buffer
- don't concat buffer with empty str, yield directly from buffer
* Fix test_log_reader
* Fix CloudWatchRemoteLogIO.read mypy
* Fix test_gcs_task_handler
* Fix core_api test_log
* Fix CloudWatchRemoteLogIO._event_to_str dt format
* Fix TestCloudRemoteLogIO.test_log_message
* Fix es/os task_hander convert_list_to_stream
* Fix compact tests
* Refactor es,os task handler for 3.0 compact
* Fix compat for RedisTaskHandler
* Fix ruff format for test_cloudwatch_task_handler after rebase
* Fix 2.10 compat TestCloudwatchTaskHandler
* Fix 3.0 compat test for celery, wasb
Fix wasb test, spelling
* Fix 3.0 compat test for gcs
* Fix 3.0 compat test for cloudwatch, s3
* Set get_log API default response format to JSON
* Remove "first_time_read" key in log metadata
* Remove "<source>_log_pos" key in log metadata
* Add LogStreamCounter for backward compatibility
* Remove "first_time_read" with backward "log_pos" for tests
- test_log_reader
- test_log_handlers
- test_cloudwatch_task_handler
- test_s3_task_handler
- celery test_log_handler
- test_gcs_task_handler
- test_wasb_task_handler
- fix redis_task_handler
- fix log_pos
* Fix RedisTaskHandler compatibility
* Fix chores in self review
- Fix typo in _read_from_logs_server
- Remove unused parameters in _stream_lines_by_chunk
- read_log_stream
- Fix doc string by removing outdate note
- Only add buffer for full_download
- Add test ndjson format for get_log API
* Fine-tune HEAP_DUMP_SIZE
* Replace get_compatible_output_log_stream with iter
* Remove buffer in log_reader
* Fix log_id not found compact for es_task_handler
* Fix review comments
- rename LogStreamCounter as LogStreamAccumulator
- simply for-yield with yield-from in log_reader
- add type annotation for LogStreamAccumulator
* Refactor LogStreamAccumulator._capture method
- use itertools.isslice to get chunk
* Fix type hint, joinedload for ti.dag_run after merge
* Replace _sort_key as _create_sort_key
* Add _flush_logs_out_of_heap common util
* Fix review nits
- _is_logs_stream_like
- add type annotation
- reduce to 1 isinstance call
- construct log_streams in _get_compatible_log_stream inline
- use TypeDict for LogMetadata
- remove len(logs) to check empty
- revert typo of self.log_handler.read in log_reader
- log_stream_accumulator
- refactor flush logic
- make totoal_lines as property
- make stream as property
* Fix mypy errors after merge
* Fix redis task handler test
* Refactor _capture logic in LogStreamAccumulator
* Add comments for ingore LogMetadata TypeDict
* Add comment for offset; Fix commet for LogMessages
* Refactor with from_iterable, islice
* Fix nits in test
- refactor structured_logs fixtures in TestLogStreamAccumulator
- use f-strign in test_file_task_handler
- assert actual value of _create_sort_key
- add details comments in test__add_log_from_parsed_log_streams_to_heap
* Refactor test_utils
* Add comment for lazy initialization
* Fix error handling for _stream_lines_by_chunk
* Fix mypy error after merge
* Fix final review nits
* Fix mypy error
(cherry picked from commit ee54fe9)
* Add note for new usage of LogMetadata
* Add _stream_parsed_lines_by_chunk
* Refactor _read_from_local/logs_server as return stream
* Refactor _interleave_logs with K-Way Merge
* Add _get_compatible_log_stream
* Refactor _read method to return stream with compatible interface
- Add compatible interface for executor, remote_logs
- Refactor skip log_pos with skip for each log source
* Refactor log_reader to adapt stream
* Fix _read_from_local open closed file error
* Refactor LogReader by yielding in batch
* Add ndjson header to get_log openapi schema
* Fix _add_log_from_parsed_log_streams_to_heap
- Add comparator for StructuredLogMessage
- Refactor parsed_log_streams from list to dict for removing empty logs
* Fix _interleave_logs dedupe logic
- should check the current logs with default timestamp
* Refactor test_log_handlers
- Fix events utils
- Add convert_list_to_stream, mock_parsed_logs_factory utils
- Fix the following test after refactoring FileTaskHandler
- test_file_task_handler_when_ti_value_is_invalid
- test_file_task_handler
- test_file_task_handler_running
- test_file_task_handler_rotate_size_limit
- test__read_when_local
- test__read_served_logs_checked_when_done_and_no_local_or_remote_logs
- test_interleave_interleaves
- test_interleave_logs_correct_ordering
- test_interleave_logs_correct_dedupe
- Add new test for refactoring FileTaskHandler
- test__stream_lines_by_chunk
- test__log_stream_to_parsed_log_stream
- test__sort_key
- test__is_sort_key_with_default_timestamp
- test__is_logs_stream_like
- test__add_log_from_parsed_log_streams_to_heap
* Move test_log_handlers utils to test_common
* Fix unit/celery/log_handlers test
* Fix mypy-providers static check
* Fix _get_compatible_log_stream
- sequential yield instead of parallel yield for all log_stream
* Fix amazon task_handler test
* Fix wask task handler test
* Fix elasticsearch task handler test
* Fix opensearch task handler test
* Fix TaskLogReader buffer
- don't concat buffer with empty str, yield directly from buffer
* Fix test_log_reader
* Fix CloudWatchRemoteLogIO.read mypy
* Fix test_gcs_task_handler
* Fix core_api test_log
* Fix CloudWatchRemoteLogIO._event_to_str dt format
* Fix TestCloudRemoteLogIO.test_log_message
* Fix es/os task_hander convert_list_to_stream
* Fix compact tests
* Refactor es,os task handler for 3.0 compact
* Fix compat for RedisTaskHandler
* Fix ruff format for test_cloudwatch_task_handler after rebase
* Fix 2.10 compat TestCloudwatchTaskHandler
* Fix 3.0 compat test for celery, wasb
Fix wasb test, spelling
* Fix 3.0 compat test for gcs
* Fix 3.0 compat test for cloudwatch, s3
* Set get_log API default response format to JSON
* Remove "first_time_read" key in log metadata
* Remove "<source>_log_pos" key in log metadata
* Add LogStreamCounter for backward compatibility
* Remove "first_time_read" with backward "log_pos" for tests
- test_log_reader
- test_log_handlers
- test_cloudwatch_task_handler
- test_s3_task_handler
- celery test_log_handler
- test_gcs_task_handler
- test_wasb_task_handler
- fix redis_task_handler
- fix log_pos
* Fix RedisTaskHandler compatibility
* Fix chores in self review
- Fix typo in _read_from_logs_server
- Remove unused parameters in _stream_lines_by_chunk
- read_log_stream
- Fix doc string by removing outdate note
- Only add buffer for full_download
- Add test ndjson format for get_log API
* Fine-tune HEAP_DUMP_SIZE
* Replace get_compatible_output_log_stream with iter
* Remove buffer in log_reader
* Fix log_id not found compact for es_task_handler
* Fix review comments
- rename LogStreamCounter as LogStreamAccumulator
- simply for-yield with yield-from in log_reader
- add type annotation for LogStreamAccumulator
* Refactor LogStreamAccumulator._capture method
- use itertools.isslice to get chunk
* Fix type hint, joinedload for ti.dag_run after merge
* Replace _sort_key as _create_sort_key
* Add _flush_logs_out_of_heap common util
* Fix review nits
- _is_logs_stream_like
- add type annotation
- reduce to 1 isinstance call
- construct log_streams in _get_compatible_log_stream inline
- use TypeDict for LogMetadata
- remove len(logs) to check empty
- revert typo of self.log_handler.read in log_reader
- log_stream_accumulator
- refactor flush logic
- make totoal_lines as property
- make stream as property
* Fix mypy errors after merge
* Fix redis task handler test
* Refactor _capture logic in LogStreamAccumulator
* Add comments for ingore LogMetadata TypeDict
* Add comment for offset; Fix commet for LogMessages
* Refactor with from_iterable, islice
* Fix nits in test
- refactor structured_logs fixtures in TestLogStreamAccumulator
- use f-strign in test_file_task_handler
- assert actual value of _create_sort_key
- add details comments in test__add_log_from_parsed_log_streams_to_heap
* Refactor test_utils
* Add comment for lazy initialization
* Fix error handling for _stream_lines_by_chunk
* Fix mypy error after merge
* Fix final review nits
* Fix mypy error
| if AIRFLOW_V_3_0_PLUS: | ||
| return list(self._event_to_dict(e) for e in events) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jason810496 This line means that logs from Cloudwatch are now showing as raw-json strings rather then rendered nicely. What was the thinking here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What was the thinking here?
After rethink, the removal of compact here is wrong.
For the other remote handler, the change should be like following:
if AIRFLOW_V_3_0_PLUS:
- return missing_log_message, metadata
+ from airflow.utils.log.file_task_handler import StructuredLogMessage
+
+ return [StructuredLogMessage(event=missing_log_message)], metadatamissing_log_message is str, and this PR was merged as 3.0.3 plus
- For
3.0.0-3.0.2, the return type of logs isstr - For
3.0.3 plusthe return type of logs islist[StructuredLogMessage]
The refactored FileTaskHandler does handle both cases.
I think this mistake removal is because AWS cloudwatch handler is the first remote handler I work on and there are some changes in the development iteration then I forget to update the logic of cloudwatch handler after the changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jason810496 A sort of fix (but not a complete one) that I knocked up astronomer@afbf6f5
Could you finish it off and get logs for cloudwatch working properly please.
I was using https://www.localstack.cloud/ (the free version) and I set these env vars in breeze to test it
AWS_ENDPOINT_URL=http://localstack-main:4566
AWS_ACCESS_KEY_ID="test" AWS_SECRET_ACCESS_KEY="test"
AWS_DEFAULT_REGION="us-east-1"
AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER=cloudwatch://arn:aws:logs:us-east-1:000000000000:log-group:test
AIRFLOW__LOGGING__REMOTE_LOGGING=True
(And then create the log group via the localstack gui, or using the AWS cli)
We also need to test latest provider against 3.0.2 and main airflow with released Amazon provider version.
| event_dt = datetime.fromtimestamp(event["timestamp"] / 1000.0, tz=timezone.utc) | ||
| formatted_event_dt = event_dt.strftime("%Y-%m-%d %H:%M:%S,%f")[:-3] | ||
| # Format a datetime object to a string in Zulu time without milliseconds. | ||
| formatted_event_dt = event_dt.strftime("%Y-%m-%dT%H:%M:%SZ") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto: why did you change this? This seems to be unrealted to OOM behavoviour, and should have been in a different PR if it was going to be changed.
Please keep changes focused to a single thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, this is my mistake, I thought the behavior with the change should be same but it turn out to be the root cause of why the cloudwatch handler is not working properly.
| LogMessages: TypeAlias = list["StructuredLogMessage"] | list[str] | ||
| """The log messages themselves, either in already sturcutured form, or a single string blob to be parsed later""" | ||
| LogMessages: TypeAlias = list[str] | ||
| """The legacy format of log messages before 3.0.2""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jason810496 This is before 3.0.3 now right? I think we didn't include this in 3.0.3 in the end
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this patch will be included in 3.0.4 release.
|
@jason810496 can you please look into cloudwtach display bug |
Yes, I'm still trying to connect airflow with localstack to test the system level behavior. |

related issue: #45079
related PR: #45129
related discussion on slack: https://apache-airflow.slack.com/archives/CCZRF2U5A/p1736767159693839
Why
In short, this PR aims to eliminate OOM issues by:
yieldgenerators instead of returning a list of strings)More detailed reasoning is already described in the linked issue.
Due to too many conflicts with the old PR (#45129), this PR reworks the changes on top of the latest
FileTaskHandler.What
This PR ports the original changes from #45129 to the current version of
FileTaskHandlerwith the following updates:Note: Recent Changes in
FileTaskHandlerStructuredLogMessageto represent each log record Render structured logs in the new UI rather than showing raw JSON #46827RemoteLogIOinterface for remote log handling Rework remote task log handling for the structlog era. #48491